//package com.joysuccess.consumer.interceptor;
//
//import ch.qos.logback.core.net.SyslogOutputStream;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.kafka.clients.consumer.ConsumerInterceptor;
//import org.apache.kafka.clients.consumer.ConsumerRecord;
//import org.apache.kafka.clients.consumer.ConsumerRecords;
//import redis.clients.jedis.Jedis;
//
//import java.util.Map;
//@Slf4j
//public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String,String> {
//    private Jedis jedis = new Jedis("127.0.0.1",6379);
//    @Override
//    public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
//        long latency = 0L;
//        for(ConsumerRecord consumerRecord:consumerRecords){
//            latency += (System.currentTimeMillis()-consumerRecord.timestamp());
//        }
//        try {
//            jedis.incrBy("totalLatency", latency);
//            //以下这段代码我认为其实可以不用在这里计算，可以在另外一个地方做计算得出结论，避免降低吞吐量
//            long totalLatency = Long.parseLong(jedis.get("totalLatency"));
//            long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
//            jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
//        }catch (Exception e){
//           log.info("error: AvgLatencyConsumerInterceptor.onConsume() msg:"+e.getMessage());
//        }
//
//        return consumerRecords;
//    }
//
//    @Override
//    public void close() {
//
//    }
//
//    @Override
//    public void onCommit(Map map) {
//
//    }
//
//    @Override
//    public void configure(Map<String, ?> map) {
//
//    }
//}
